package com.opencee.cloud.msg.service.impl;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.opencee.cloud.msg.api.constatns.*;
import com.opencee.cloud.msg.api.entity.MsgMessageContentEntity;
import com.opencee.cloud.msg.api.entity.MsgMessageReadEntity;
import com.opencee.cloud.msg.api.entity.MsgMessageTemplateEntity;
import com.opencee.cloud.msg.api.vo.DelayedMessage;
import com.opencee.cloud.msg.api.vo.MessageInboxResult;
import com.opencee.cloud.msg.api.vo.MsgChannelConfigVO;
import com.opencee.cloud.msg.api.vo.params.PushSingleParams;
import com.opencee.cloud.msg.api.vo.params.TopicMessage;
import com.opencee.cloud.msg.api.vo.params.TopicParams;
import com.opencee.cloud.msg.config.PushProperties;
import com.opencee.cloud.msg.config.WechatProperties;
import com.opencee.cloud.msg.mq.QueueConfiguration;
import com.opencee.cloud.msg.service.MessageContentService;
import com.opencee.cloud.msg.service.MessageReadService;
import com.opencee.cloud.msg.service.MsgTaskPushRecordsService;
import com.opencee.cloud.msg.service.MsgTemplateService;
import com.opencee.cloud.msg.service.impl.push.GeTuiSenderServiceImpl;
import com.opencee.cloud.msg.service.impl.push.JPushSenderServiceImpl;
import com.opencee.cloud.msg.utils.MqUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
 * 消息推送接口实现类
 *
 * @author yadu
 */
@Slf4j
@Service
public class PushServiceImpl {
    /**
     * 创建固定大小为100 的线程池
     */
    private Integer availableProcessors = Runtime.getRuntime().availableProcessors();
    private Integer numOfThreads = availableProcessors * 2;
    private ExecutorService executorService = new ThreadPoolExecutor(numOfThreads, numOfThreads, 0, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
    /**
     * 多线程分发数量
     */
    private static final int SIZE = 500;

    @Autowired
    private JPushSenderServiceImpl jPushService;
    @Autowired
    private GeTuiSenderServiceImpl gPushPushService;
    @Autowired
    private MessageContentService messageContentService;
    @Autowired
    private MsgTaskPushRecordsService messagePushRecordsService;
    @Autowired
    private MsgTaskPushRecordsService messagePushDetailsService;
    @Autowired
    private PushProperties pushProperties;
    @Autowired
    private WechatProperties wechatProperties;
    @Autowired
    private MessageReadService messageInboxService;
    @Autowired
    private MsgTemplateService msgTemplateService;
    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * 推送消息
     *
     * @param message
     * @return
     */
    @Override
    public void send(MsgChannelConfigVO channelConfig, PushSingleParams params) {
        try {
            MessagePushRecords record = new MessagePushRecords();
            // 设置默认状态
            record.setState(MsgSentStatus.UNSENT.ordinal());
            MsgMessageContentEntity content = new MsgMessageContentEntity();
            // 用户ID编号集合
            Set userSet = new HashSet();
            // 用户微信openId集合
            Set<String> openIdSet = new HashSet<>();
            Assert.notNull(params.getLevel(), "level不能为空");
            Assert.notNull(params.getAppId(), "appId不能为空");
            Assert.notNull(params.getPushMode(), "pushMode不能为空");
            Assert.notEmpty(params.getPlatform(), "platform不能为空");
            Assert.isTrue(!(CollectionUtils.isEmpty(params.getReceiverIds()) && CollectionUtils.isEmpty(params.getReceiverTags())), "接受者不能为空");
            Assert.isTrue(MsgLevel.isInclude(params.getLevel()), "level不匹配");
            Assert.isTrue(MsgContentType.isInclude(params.getPushMode()), "push不匹配");
            for (Integer s : params.getPlatform()) {
                Assert.isTrue(MsgPushPlatform.isInclude(s), "platform不匹配");
            }
            if(MsgContentType.NORMAL.equals(params.getPushMode())){
                Assert.hasText(params.getTitle(), "title不能为空");
                Assert.hasText(params.getContent(), "pushMode不能为空");
            }
            if(MsgContentType.TEMPLATE.equals(params.getPushMode())){

            }



            if ( !StringUtils.isEmpty(params.getTemplateCode())) {
                record.setTplCode(params.getTemplateCode());
                // 获取模板信息
                MsgMessageTemplateEntity tpl = msgTemplateService.getByTplCode(record.getTplCode());
                Assert.notNull(tpl, "模板信息不存在");
                record.setTplId(tpl.getTplId());
                record.setTplContent(tpl.getTemplateContent());
                record.setTplParams(JSONObject.toJSONString(message.getTemplateParams()));
                content.setTitle(StringUtils.isEmpty(message.getTitle()) ? tpl.getTemplateName() : message.getTitle());
                content.setContent(StrUtil.format(record.getTplContent(), JSONObject.parseObject(record.getTplParams())));
            } else {
                content.setTitle(message.getTitle());
                content.setContent(message.getContent());
            }

        } catch (Exception e) {
            record.setState(MsgSentStatus.ERROR.ordinal());
            record.setRemark(e.getMessage());
            log.error("推送错误:", e);
        } finally {
            if (!StringUtils.isEmpty(content.getTitle())) {
                content.setLevel(message.getLevel());
                content.setType(message.getType());
                content.setExtras(JSONObject.toJSONString(message.getExtras()));
                // 保存消息内容
                messageContentService.save(content);
            }
            record.setMsgId(content.getMsgId());
            record.setAppId(message.getAppID());
            record.setPlatform(StrUtil.join(",", message.getPlatform()));
            record.setServiceId(message.getServiceID());
            record.setSourceId(message.getSourceID());
            record.setSenderId(message.getSenderID());
            record.setIsBatch(message.getReceiverList().size() > 1 ? 1 : 0);
            record.setReceiverType(message.getReceiverType());
            record.setReceiverList(StrUtil.join(",", message.getReceiverList()));
            record.setSuccessCount(0);
            record.setTotalCount(message.getReceiverList().size());
            record.setDeleted(0);
            record.setDelayTime(message.getTiming());
            record.setCreateTime(new Date());
            record.setUpdateTime(record.getCreateTime());
            // 保存发送记录
            messagePushRecordsService.save(record);
        }
        Integer successCount = 0;
        if (record.getState().intValue() == MsgSentStatus.UNSENT.ordinal()) {
            if (record.getDelayTime() == null || (record.getDelayTime() != null && (record.getDelayTime().getTime() - System.currentTimeMillis()) <= 0)) {
                // 为空或<=now为立即
                // 修改状态为:发送中
                record.setState(MsgSentStatus.SENDING.ordinal());
                MessagePushRecords modify = new MessagePushRecords();
                modify.setBizId(record.getBizId());
                modify.setState(record.getState());
                messagePushRecordsService.updateById(modify);
                // 批量发送
                successCount = batchSend(null, userSet, openIdSet, record, content);
                // 修改状态为:发送完成
                record.setState(MsgSentStatus.COMPLETED.ordinal());
                modify.setSuccessCount(successCount);
                modify.setState(record.getState());
                messagePushRecordsService.updateById(modify);
            } else if (record.getDelayTime() != null && DateUtil.isSameDay(new Date(), record.getDelayTime())) {
                // 当天的定时发送直接加入延迟队列
                addDelayQueue(record);
            }
        }

        return successCount > 0;
    }

    /**
     * 根据批次推送消息
     *
     * @param bizId
     * @return
     */
    @Override
    public boolean sendDelayed(Long bizId) {
        MessagePushRecords record = messagePushRecordsService.getById(bizId);

        if (record == null) {
            return false;
        }
        if (record.getState().intValue() != MsgSentStatus.WAITING.ordinal()) {
            // 非等待中,不处理
            return false;
        }
        MsgMessageContentEntity messageContent = messageContentService.getById(record.getMsgId());
        if (messageContent == null) {
            return false;
        }
        // 用户ID编号集合
        Set userSet = new HashSet();
        // 用户微信openId集合
        Set<String> openIdSet = new HashSet<>();
        if (record.getReceiverType().intValue() == ReceiverType.USER.ordinal()) {
            // 加入用户ID
            Set<Long> userIdSet = StrUtil.split(record.getReceiverList(), ',').stream().map(s -> Long.parseLong(s)).collect(Collectors.toSet());
            userSet = new HashSet();
        } else if (record.getReceiverType().intValue() == ReceiverType.ROLE.ordinal()) {
            // 通过角色获取用户ID
            userSet = new HashSet();
        } else if (record.getReceiverType().intValue() == ReceiverType.DEPT.ordinal()) {
            // 通过部门获取用户ID
            Set<Long> ids = StrUtil.split(record.getReceiverList(), ',').stream().map(s -> Long.parseLong(s)).collect(Collectors.toSet());
            userSet = new HashSet();
        } else if (record.getReceiverType().intValue() == ReceiverType.WORKSPACE_SCOPE.ordinal()) {

        }

        if (record.getPlatform().contains(String.valueOf(MsgPushPlatform.WECHAT.ordinal()))) {
            // todo 获取用户openId
        }
        // 修改状态为:发送中
        record.setState(MsgSentStatus.SENDING.ordinal());
        MessagePushRecords modify = new MessagePushRecords();
        modify.setBizId(record.getBizId());
        modify.setState(record.getState());
        messagePushRecordsService.updateById(modify);
        Integer successCount = 0;
        // 批量发送
        successCount = batchSend(null, userSet, openIdSet, record, messageContent);
        // 修改状态为:发送完成
        record.setState(MsgSentStatus.COMPLETED.ordinal());
        modify.setSuccessCount(successCount);
        modify.setState(record.getState());
        messagePushRecordsService.updateById(modify);
        return successCount > 0;
    }

    /**
     * 提前几天加载延迟记录到延迟队列
     *
     * @param beforeDay 提前几天,默认提前一天
     */
    @Override
    public Integer beforeLoadDelayQueue(Integer beforeDay) {
        if (beforeDay == null) {
            beforeDay = 1;
        }
        Map map = new HashMap();
        map.put("before", beforeDay);
        map.put("state", MsgSentStatus.UNSENT.ordinal());
        List<MessagePushRecords> list = messagePushRecordsService.findList(map);
        if (list != null && !list.isEmpty()) {
            list.forEach(r -> {
                addDelayQueue(r);
            });
        }
        return list.size();
    }


    /**
     * 批量发送短信
     *
     * @param users
     * @param record
     * @return
     */
    private int batchSend(final Object sender, Set users, Set<String> openIds, final MessagePushRecords record, final MsgMessageContentEntity message) {
        String channel = pushProperties.getChannel();
        if (users == null || users.isEmpty()) {
            return 0;
        }
        long begin = System.nanoTime();
        BigDecimal successCount = BigDecimal.ZERO;
        List<Future<Integer>> futureList = new ArrayList<>();
        // 总数
        int totalCount = users.size();
        // 多线程分发数。极光最一次最多1000。默认500
        final int size = SIZE;
        // 线程数
        int threadCount = (totalCount + size - 1) / size;

        for (int i = 0; i < threadCount; i++) {
            final int beginIndex = i * size;
            final int endIndex = (threadCount - i) == 1 ? totalCount : (beginIndex + size);
            //使用Future，Callable实现发送消息后返回发送结果
            Future<Integer> future = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    // 发送成功数
                    AtomicInteger success = new AtomicInteger();
                    try {
                        // 截取分页发送列表
                        //users.subList(beginIndex, endIndex)
                        List subList = new ArrayList();
                        // 发送总数
                        Integer size = subList.size();
                        List<MessagePushDetails> webDetailList = new ArrayList<>();
                        List<MessagePushDetails> appDetailList = new ArrayList<>();
                        List<MessagePushDetails> wechatDetailList = new ArrayList<>();
                        List<MsgMessageReadEntity> inboxList = new ArrayList<>();
                        TopicParams topicMessageParams = new TopicParams();
                        JSONObject pushRequestParams = new JSONObject();
                        JSONObject pushResponseResult = new JSONObject();
                        Date sendTime = new Date();
                        if (log.isDebugEnabled()) {
                            log.debug("是否开启APP推送:【{}】", pushProperties.isEnabled());
                            log.debug("是否开启微信推送:【{}】", wechatProperties.isEnabled());
                        }

                        // 发送明细
                        subList.forEach(u -> {
                            // APP
                            if (record.getPlatform().contains(String.valueOf(MsgPushPlatform.APP.ordinal()))) {
                                MessagePushDetails appDetail = new MessagePushDetails();
                                appDetail.setBizId(record.getBizId());
                                appDetail.setSendTime(sendTime);
                                appDetail.setPlatform(MsgPushPlatform.APP.ordinal());
                                appDetail.setResult(1);
                                appDetail.setDeleted(0);
                                appDetailList.add(appDetail);
                            }

                            // WEB推送
                            if (record.getPlatform().contains(String.valueOf(MsgPushPlatform.WEB.ordinal()))) {
                                MessagePushDetails webDetail = new MessagePushDetails();
                                webDetail.setBizId(record.getBizId());
                                webDetail.setSendTime(sendTime);
                                webDetail.setPlatform(MsgPushPlatform.WEB.ordinal());
                                webDetail.setResult(1);
                                webDetail.setDeleted(0);
                                webDetailList.add(webDetail);
                                MessageInboxResult inboxMsgResult = new MessageInboxResult();
                                inboxMsgResult.setMsgId(record.getMsgId());
                                inboxMsgResult.setExtras(message.getExtras());
                                inboxMsgResult.setType(message.getType());
                                inboxMsgResult.setLevel(message.getLevel());
                                inboxMsgResult.setTitle(message.getTitle());
                                inboxMsgResult.setContent(message.getContent());
                                inboxMsgResult.setSenderId(record.getSenderId());
                                inboxMsgResult.setSendTime(record.getCreateTime());
                                if (sender == null) {
                                    inboxMsgResult.setSenderName("系统");
                                } else {
                                }
                                TopicMessage topicMessage = new TopicMessage("", TopicType.WS_SEND.getRouteKey(), TopicType.WS_SEND.getAction(), inboxMsgResult);
                                topicMessageParams.add(topicMessage);
                            }

                            // 微信
                            if (record.getPlatform().contains(String.valueOf(MsgPushPlatform.WECHAT.ordinal()))) {
                                MessagePushDetails wechatDetail = new MessagePushDetails();
                                wechatDetail.setBizId(record.getBizId());
                                wechatDetail.setSendTime(sendTime);
                                wechatDetail.setPlatform(MsgPushPlatform.WECHAT.ordinal());
                                wechatDetail.setResult(1);
                                wechatDetail.setDeleted(0);
                                wechatDetailList.add(wechatDetail);
                            }
                        });

                        if (pushProperties.isEnabled() && !appDetailList.isEmpty()) {
                            try {
                                Map<String, String> extras = new HashMap<>();
                                String[] alias = new String[2];
                                String title = message.getTitle();
                                String alert = message.getContent();
                                extras.put("msgId", message.getMsgId().toString());
                                extras.put("type", message.getType().toString());
                                extras.put("level", message.getLevel().toString());
                                pushRequestParams.put("extras", extras);
                                pushRequestParams.put("alias", alias);
                                pushRequestParams.put("title", title);
                                pushRequestParams.put("alert", alert);
                                pushRequestParams.put("channel", channel);
                                if ("jpush".equals(channel)) {
                                    String response = jPushService.sendPush(app, alias, null, title, alert, extras);
                                    pushResponseResult = JSONObject.parseObject(response);
                                } else {
                                    String response = gPushPushService.sendPush(app, alias, null, title, alert, extras);
                                    if (response != null) {
                                        pushResponseResult = JSONObject.parseObject(response);
                                    }
                                }
                            } catch (Exception e) {
                                appDetailList.forEach(d -> {
                                    d.setResult(0);
                                    d.setChannel(channel);
                                    d.setRemark((d.getRemark() == null ? "" : d.getRemark() + ",") + "错误信息：" + e.getMessage());
                                });
                                log.error("APP推送错误:", e);
                            } finally {
                                JSONObject finalJpushResponseResult = pushResponseResult;
                                appDetailList.forEach(d -> {
                                    d.setRequestParams(pushRequestParams.toJSONString());
                                    if (finalJpushResponseResult != null) {
                                        d.setResponseResult(finalJpushResponseResult.toJSONString());
                                        if ("jpush".equals(pushProperties.getChannel())) {
                                            d.setResult(finalJpushResponseResult.getIntValue("statusCode") == 0 ? 1 : 0);
                                        } else {
                                            d.setResult("success".equals(finalJpushResponseResult.getString("result")) ? 1 : 0);
                                        }
                                    }
                                });
                            }
                        }

                        try {
                            // 保存明细
                            if (webDetailList != null && !webDetailList.isEmpty()) {
                                messagePushDetailsService.saveBatch(webDetailList);
                            }
                            if (appDetailList != null && !appDetailList.isEmpty()) {
                                messagePushDetailsService.saveBatch(appDetailList);
                            }
                            if (wechatDetailList != null && !wechatDetailList.isEmpty()) {
                                messagePushDetailsService.saveBatch(wechatDetailList);
                            }

                            if (subList != null && subList.size() > 0) {
                                // 保存消息收件箱
                                subList.stream().forEach(u -> {
                                    MsgMessageReadEntity inbox = new MsgMessageReadEntity();
                                    inbox.setMsgId(record.getMsgId());
                                    inbox.setReadState(0);
                                    inbox.setDeleted(0);
                                    inbox.setSenderId(record.getSenderId());
                                    inbox.setSendTime(record.getCreateTime());
                                    inboxList.add(inbox);
                                });
                                messageInboxService.saveBatch(inboxList);
                            }

                            // 发送websocket消息
                            MqUtil.topic(amqpTemplate, topicMessageParams);

                        } catch (Exception e) {
                            log.error("写入消息收件箱错误:", e);
                        }
                        if (log.isDebugEnabled()) {
                            log.debug("批量发送list范围: {}-{} 发送数量:【{}】", beginIndex, endIndex, size);
                        }
                    } catch (Exception e) {
                        log.error("发送失败", e);
                    }
                    return success.incrementAndGet();
                }
            });
            futureList.add(future);
        }

        if (log.isDebugEnabled()) {
            log.debug("-----------------------" + (System.nanoTime() - begin) / 1000_000d + "-----------------------");
        }
        //主线程其他工作完毕,等待子线程的结束, 调用future.get()系列的方法即可。
        for (
                Future<Integer> result : futureList) {
            try {
                successCount = successCount.add(new BigDecimal(result.get()));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("发送消息结束，耗时：" + (System.nanoTime() - begin) / 1000_000d);
        }

        TopicParams topicMessageParams = new TopicParams();
        topicMessageParams.add(new TopicMessage(record.getTenantId(), TopicType.PUSH_DONE.getRouteKey(), TopicType.PUSH_DONE.getAction(), record));
        MqUtil.topic(amqpTemplate, topicMessageParams);
        return successCount.intValue();
    }

    /**
     * 加入队列
     */
    public void addDelayQueue(final MessagePushRecords record) {
        if (record == null) {
            return;
        }
        if (record.getBizId() == null || record.getDelayTime() == null) {
            return;
        }
        long times = record.getDelayTime().getTime() - System.currentTimeMillis();
        if (times < 0) {
            return;
        }
        log.debug("定时推送记录：{}，延迟：{}/ms", record.getBizId(), times);
        MessagePushRecords modify = new MessagePushRecords();
        modify.setBizId(record.getBizId());
        try {
            JSONObject json = new JSONObject();
            json.put("bizId", record.getBizId());
            // 减少消息队列体积只放入ID
            DelayedMessage delayedMessage = new DelayedMessage(record.getTenantId(), QueueConfiguration.DELAYED_PUSH_MSG_QUEUE_RK, null, times, json);
            MqUtil.delayed(amqpTemplate, delayedMessage);
            // 修改状态为:等待中
            modify.setState(MsgSentStatus.WAITING.ordinal());
        } catch (Exception e) {
            modify.setState(MsgSentStatus.ERROR.ordinal());
            modify.setRemark(e.getMessage());
        }
        messagePushRecordsService.updateById(modify);
    }


}
